pull: Add queuing into the higher level logic
authorColin Walters <walters@verbum.org>
Thu, 19 Jan 2017 02:57:07 +0000 (21:57 -0500)
committerAtomic Bot <atomic-devel@projectatomic.io>
Tue, 7 Feb 2017 19:59:40 +0000 (19:59 +0000)
Working on the libcurl backend, I didn't want to reimplement another queue. I
think the queue logic is really better done at the high level, since the fetcher
knows how we want to prioritize metadata over content, etc.

Adding another queue here is duplication, but things will look nicer when we can
actually delete the libsoup one in the next commit.

Closes: #654
Approved by: jlebon

src/libostree/ostree-repo-private.h
src/libostree/ostree-repo-pull.c

index cfc178f30de837dfd86600708a7305867fd65da6..73e024467025443299e374835e70a6c71f94cbc8 100644 (file)
@@ -32,6 +32,9 @@ G_BEGIN_DECLS
 #define _OSTREE_SUMMARY_CACHE_DIR "summaries"
 #define _OSTREE_CACHE_DIR "cache"
 
+#define _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS 8
+#define _OSTREE_MAX_OUTSTANDING_DELTAPART_REQUESTS 2
+
 typedef enum {
   OSTREE_REPO_TEST_ERROR_PRE_COMMIT = (1 << 0)
 } OstreeRepoTestErrorFlags;
index 0adb65d4948b3c6dbf3e7926ee94924a4675f2ec..21f3007e3111c9da95d5d8ca4e9060dc5fd87851 100644 (file)
@@ -81,6 +81,9 @@ typedef struct {
   GHashTable       *scanned_metadata; /* Maps object name to itself */
   GHashTable       *requested_metadata; /* Maps object name to itself */
   GHashTable       *requested_content; /* Maps checksum to itself */
+  GHashTable       *pending_fetch_metadata; /* Map<ObjectName,FetchObjectData> */
+  GHashTable       *pending_fetch_content; /* Map<checksum,FetchObjectData> */
+  GHashTable       *pending_fetch_deltaparts; /* Set<FetchStaticDeltaData> */
   guint             n_outstanding_metadata_fetches;
   guint             n_outstanding_metadata_write_requests;
   guint             n_outstanding_content_fetches;
@@ -133,6 +136,10 @@ typedef struct {
   OtPullData  *pull_data;
   GVariant *objects;
   char *expected_checksum;
+  char *from_revision;
+  char *to_revision;
+  guint i;
+  guint64 size;
 } FetchStaticDeltaData;
 
 typedef struct {
@@ -142,6 +149,10 @@ typedef struct {
   guint recursion_depth;
 } ScanObjectQueueData;
 
+static void start_fetch (OtPullData *pull_data, FetchObjectData *fetch);
+static void start_fetch_deltapart (OtPullData *pull_data,
+                                   FetchStaticDeltaData *fetch);
+static gboolean fetcher_queue_is_full (OtPullData *pull_data);
 static void queue_scan_one_metadata_object (OtPullData         *pull_data,
                                             const char         *csum,
                                             OstreeObjectType    objtype,
@@ -271,6 +282,77 @@ check_outstanding_requests_handle_error (OtPullData          *pull_data,
           g_error_free (error);
         }
     }
+  else
+    {
+      GHashTableIter hiter;
+      gpointer key, value;
+
+      /* We may have just completed an async fetch operation. Now we look at
+       * possibly enqueuing more requests. The goal of queuing is to both avoid
+       * overloading the fetcher backend with HTTP requests, but also to
+       * prioritize metadata fetches over content, so we have accurate
+       * reporting. Hence here, we process metadata fetches first.
+       */
+
+      /* Try filling the queue with metadata we need to fetch */
+      g_hash_table_iter_init (&hiter, pull_data->pending_fetch_metadata);
+      while (!fetcher_queue_is_full (pull_data) &&
+             g_hash_table_iter_next (&hiter, &key, &value))
+        {
+          GVariant *objname = key;
+          FetchObjectData *fetch = value;
+
+          /* Steal both key and value */
+          g_hash_table_iter_steal (&hiter);
+
+          /* This takes ownership of the value */
+          start_fetch (pull_data, fetch);
+          /* And unref the key */
+          g_variant_unref (objname);
+        }
+
+      /* Now, process deltapart requests */
+      g_hash_table_iter_init (&hiter, pull_data->pending_fetch_deltaparts);
+      while (!fetcher_queue_is_full (pull_data) &&
+             g_hash_table_iter_next (&hiter, &key, &value))
+        {
+          FetchStaticDeltaData *fetch = key;
+          g_hash_table_iter_steal (&hiter);
+          /* Takes ownership */
+          start_fetch_deltapart (pull_data, fetch);
+        }
+
+      /* Next, fill the queue with content */
+      g_hash_table_iter_init (&hiter, pull_data->pending_fetch_content);
+      while (!fetcher_queue_is_full (pull_data) &&
+             g_hash_table_iter_next (&hiter, &key, &value))
+        {
+          char *checksum = key;
+          FetchObjectData *fetch = value;
+
+          /* Steal both key and value */
+          g_hash_table_iter_steal (&hiter);
+
+          /* This takes ownership of the value */
+          start_fetch (pull_data, fetch);
+          /* And unref the key */
+          g_free (checksum);
+        }
+
+    }
+}
+
+/* We have a total-request limit, as well has a hardcoded max of 2 for delta
+ * parts. The logic for the delta one is that processing them is expensive, and
+ * doing multiple simultaneously could risk space/memory on smaller devices.
+ */
+static gboolean
+fetcher_queue_is_full (OtPullData *pull_data)
+{
+  return (pull_data->n_outstanding_metadata_fetches +
+          pull_data->n_outstanding_content_fetches +
+          pull_data->n_outstanding_deltapart_fetches) == _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS ||
+    pull_data->n_outstanding_deltapart_fetches == _OSTREE_MAX_OUTSTANDING_DELTAPART_REQUESTS;
 }
 
 static gboolean
@@ -942,6 +1024,8 @@ fetch_static_delta_data_free (gpointer  data)
   FetchStaticDeltaData *fetch_data = data;
   g_free (fetch_data->expected_checksum);
   g_variant_unref (fetch_data->objects);
+  g_free (fetch_data->from_revision);
+  g_free (fetch_data->to_revision);
   g_free (fetch_data);
 }
 
@@ -1343,52 +1427,92 @@ enqueue_one_object_request (OtPullData        *pull_data,
                             gboolean           is_detached_meta,
                             gboolean           object_is_stored)
 {
-  g_autofree char *obj_subpath = NULL;
   gboolean is_meta;
   FetchObjectData *fetch_data;
-  guint64 *expected_max_size_p;
-  guint64 expected_max_size;
-  GPtrArray *mirrorlist = NULL;
 
-  g_debug ("queuing fetch of %s.%s%s", checksum,
-           ostree_object_type_to_string (objtype),
-           is_detached_meta ? " (detached)" : "");
+  is_meta = OSTREE_OBJECT_TYPE_IS_META (objtype);
+
+  fetch_data = g_new0 (FetchObjectData, 1);
+  fetch_data->pull_data = pull_data;
+  fetch_data->object = ostree_object_name_serialize (checksum, objtype);
+  fetch_data->path = g_strdup (path);
+  fetch_data->is_detached_meta = is_detached_meta;
+  fetch_data->object_is_stored = object_is_stored;
 
-  if (is_detached_meta)
+  if (is_meta)
+    pull_data->n_requested_metadata++;
+  else
+    pull_data->n_requested_content++;
+
+  /* Are too many requests are in flight? */
+  if (fetcher_queue_is_full (pull_data))
     {
-      char buf[_OSTREE_LOOSE_PATH_MAX];
-      _ostree_loose_path (buf, checksum, OSTREE_OBJECT_TYPE_COMMIT_META, pull_data->remote_mode);
-      obj_subpath = g_build_filename ("objects", buf, NULL);
-      mirrorlist = pull_data->meta_mirrorlist;
+      g_debug ("queuing fetch of %s.%s%s", checksum,
+               ostree_object_type_to_string (objtype),
+               is_detached_meta ? " (detached)" : "");
+
+      if (is_meta)
+        {
+          GVariant *objname = ostree_object_name_serialize (checksum, objtype);
+          g_hash_table_insert (pull_data->pending_fetch_metadata, objname, fetch_data);
+        }
+      else
+        {
+          g_hash_table_insert (pull_data->pending_fetch_content, g_strdup (checksum), fetch_data);
+        }
     }
   else
     {
-      obj_subpath = _ostree_get_relative_object_path (checksum, objtype, TRUE);
-      mirrorlist = pull_data->content_mirrorlist;
+      start_fetch (pull_data, fetch_data);
     }
+}
+
+static void
+start_fetch (OtPullData *pull_data,
+             FetchObjectData *fetch)
+{
+  gboolean is_meta;
+  g_autofree char *obj_subpath = NULL;
+  guint64 *expected_max_size_p;
+  guint64 expected_max_size;
+  const char *expected_checksum;
+  OstreeObjectType objtype;
+  GPtrArray *mirrorlist = NULL;
+
+  ostree_object_name_deserialize (fetch->object, &expected_checksum, &objtype);
+  is_meta = OSTREE_OBJECT_TYPE_IS_META (objtype);
+
+  g_debug ("starting fetch of %s.%s%s", expected_checksum,
+           ostree_object_type_to_string (objtype),
+           fetch->is_detached_meta ? " (detached)" : "");
 
   is_meta = OSTREE_OBJECT_TYPE_IS_META (objtype);
   if (is_meta)
+    pull_data->n_outstanding_metadata_fetches++;
+  else
+    pull_data->n_outstanding_content_fetches++;
+
+  /* Override the path if we're trying to fetch the .commitmeta file first */
+  if (fetch->is_detached_meta)
     {
-      pull_data->n_outstanding_metadata_fetches++;
-      pull_data->n_requested_metadata++;
+      char buf[_OSTREE_LOOSE_PATH_MAX];
+      _ostree_loose_path (buf, expected_checksum, OSTREE_OBJECT_TYPE_COMMIT_META, pull_data->remote_mode);
+      obj_subpath = g_build_filename ("objects", buf, NULL);
+      mirrorlist = pull_data->meta_mirrorlist;
     }
   else
     {
-      pull_data->n_outstanding_content_fetches++;
-      pull_data->n_requested_content++;
+      obj_subpath = _ostree_get_relative_object_path (expected_checksum, objtype, TRUE);
+      mirrorlist = pull_data->content_mirrorlist;
     }
-  fetch_data = g_new0 (FetchObjectData, 1);
-  fetch_data->pull_data = pull_data;
-  fetch_data->object = ostree_object_name_serialize (checksum, objtype);
-  fetch_data->path = g_strdup (path);
-  fetch_data->is_detached_meta = is_detached_meta;
-  fetch_data->object_is_stored = object_is_stored;
 
-  expected_max_size_p = is_detached_meta ? NULL : g_hash_table_lookup (pull_data->expected_commit_sizes, checksum);
+  /* We may have determined maximum sizes from the summary file content; if so,
+   * honor it. Otherwise, metadata has a baseline max size.
+   */
+  expected_max_size_p = fetch->is_detached_meta ? NULL : g_hash_table_lookup (pull_data->expected_commit_sizes, expected_checksum);
   if (expected_max_size_p)
     expected_max_size = *expected_max_size_p;
-  else if (is_meta)
+  else if (OSTREE_OBJECT_TYPE_IS_META (objtype))
     expected_max_size = OSTREE_MAX_METADATA_SIZE;
   else
     expected_max_size = 0;
@@ -1398,7 +1522,7 @@ enqueue_one_object_request (OtPullData        *pull_data,
                                       is_meta ? OSTREE_REPO_PULL_METADATA_PRIORITY
                                       : OSTREE_REPO_PULL_CONTENT_PRIORITY,
                                       pull_data->cancellable,
-                                      is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
+                                      is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch);
 }
 
 static gboolean
@@ -1502,6 +1626,22 @@ process_one_static_delta_fallback (OtPullData   *pull_data,
   return ret;
 }
 
+static void
+start_fetch_deltapart (OtPullData *pull_data,
+                       FetchStaticDeltaData *fetch)
+{
+  g_autofree char *deltapart_path = _ostree_get_relative_static_delta_part_path (fetch->from_revision, fetch->to_revision, fetch->i);
+  pull_data->n_outstanding_deltapart_fetches++;
+  g_assert_cmpint (pull_data->n_outstanding_deltapart_fetches, <=, _OSTREE_MAX_OUTSTANDING_DELTAPART_REQUESTS);
+  _ostree_fetcher_request_to_tmpfile (pull_data->fetcher,
+                                      pull_data->content_mirrorlist,
+                                      deltapart_path, fetch->size,
+                                      OSTREE_FETCHER_DEFAULT_PRIORITY,
+                                      pull_data->cancellable,
+                                      static_deltapart_fetch_on_complete,
+                                      fetch);
+}
+
 static gboolean
 process_one_static_delta (OtPullData   *pull_data,
                           const char   *from_revision,
@@ -1652,9 +1792,13 @@ process_one_static_delta (OtPullData   *pull_data,
         continue;
       
       fetch_data = g_new0 (FetchStaticDeltaData, 1);
+      fetch_data->from_revision = g_strdup (from_revision);
+      fetch_data->to_revision = g_strdup (to_revision);
       fetch_data->pull_data = pull_data;
       fetch_data->objects = g_variant_ref (objects);
       fetch_data->expected_checksum = ostree_checksum_from_bytes_v (csum_v);
+      fetch_data->size = size;
+      fetch_data->i = i;
 
       if (inline_part_bytes != NULL)
         {
@@ -1678,14 +1822,12 @@ process_one_static_delta (OtPullData   *pull_data,
         }
       else
         {
-          _ostree_fetcher_request_to_tmpfile (pull_data->fetcher,
-                                              pull_data->content_mirrorlist,
-                                              deltapart_path, size,
-                                              OSTREE_FETCHER_DEFAULT_PRIORITY,
-                                              pull_data->cancellable,
-                                              static_deltapart_fetch_on_complete,
-                                              fetch_data);
-          pull_data->n_outstanding_deltapart_fetches++;
+          if (!fetcher_queue_is_full (pull_data))
+            start_fetch_deltapart (pull_data, fetch_data);
+          else
+            {
+              g_hash_table_add (pull_data->pending_fetch_deltaparts, fetch_data);
+            }
         }
     }
 
@@ -2446,6 +2588,14 @@ ostree_repo_pull_with_options (OstreeRepo             *self,
                                                         (GDestroyNotify)g_free, NULL);
   pull_data->requested_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal,
                                                          (GDestroyNotify)g_variant_unref, NULL);
+  pull_data->pending_fetch_content = g_hash_table_new_full (g_str_hash, g_str_equal,
+                                                            (GDestroyNotify)g_free,
+                                                            (GDestroyNotify)fetch_object_data_free);
+  pull_data->pending_fetch_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal,
+                                                             (GDestroyNotify)g_variant_unref,
+                                                             (GDestroyNotify)fetch_object_data_free);
+  pull_data->pending_fetch_deltaparts = g_hash_table_new_full (NULL, NULL, (GDestroyNotify)fetch_static_delta_data_free, NULL);
+
   if (dir_to_pull != NULL || dirs_to_pull != NULL)
     {
       pull_data->dirs = g_ptr_array_new_with_free_func (g_free);
@@ -3157,6 +3307,9 @@ ostree_repo_pull_with_options (OstreeRepo             *self,
   g_clear_pointer (&pull_data->summary_deltas_checksums, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&pull_data->requested_content, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref);
+  g_clear_pointer (&pull_data->pending_fetch_content, (GDestroyNotify) g_hash_table_unref);
+  g_clear_pointer (&pull_data->pending_fetch_metadata, (GDestroyNotify) g_hash_table_unref);
+  g_clear_pointer (&pull_data->pending_fetch_deltaparts, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&pull_data->idle_src, (GDestroyNotify) g_source_destroy);
   g_clear_pointer (&pull_data->dirs, (GDestroyNotify) g_ptr_array_unref);
   g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref);